sqlstore.go 9.7 KB


  1. package sqlstore
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "os"
  7. "path"
  8. "path/filepath"
  9. "strings"
  10. "time"
  11. "github.com/go-sql-driver/mysql"
  12. "github.com/go-xorm/xorm"
  13. "github.com/grafana/grafana/pkg/bus"
  14. "github.com/grafana/grafana/pkg/infra/localcache"
  15. "github.com/grafana/grafana/pkg/infra/log"
  16. m "github.com/grafana/grafana/pkg/models"
  17. "github.com/grafana/grafana/pkg/registry"
  18. "github.com/grafana/grafana/pkg/services/annotations"
  19. "github.com/grafana/grafana/pkg/services/sqlstore/migrations"
  20. "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
  21. "github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
  22. "github.com/grafana/grafana/pkg/setting"
  23. _ "github.com/grafana/grafana/pkg/tsdb/mssql"
  24. "github.com/grafana/grafana/pkg/util"
  25. _ "github.com/lib/pq"
  26. )
  27. var (
  28. x *xorm.Engine
  29. dialect migrator.Dialect
  30. sqlog log.Logger = log.New("sqlstore")
  31. )
  32. const ContextSessionName = "db-session"
  33. func init() {
  34. // This change will make xorm use an empty default schema for postgres and
  35. // by that mimic the functionality of how it was functioning before
  36. // xorm's changes above.
  37. xorm.DefaultPostgresSchema = ""
  38. registry.Register(&registry.Descriptor{
  39. Name: "SqlStore",
  40. Instance: &SqlStore{},
  41. InitPriority: registry.High,
  42. })
  43. }
  44. type SqlStore struct {
  45. Cfg *setting.Cfg `inject:""`
  46. Bus bus.Bus `inject:""`
  47. CacheService *localcache.CacheService `inject:""`
  48. dbCfg DatabaseConfig
  49. engine *xorm.Engine
  50. log log.Logger
  51. Dialect migrator.Dialect
  52. skipEnsureAdmin bool
  53. }
  54. func (ss *SqlStore) Init() error {
  55. ss.log = log.New("sqlstore")
  56. ss.readConfig()
  57. engine, err := ss.getEngine()
  58. if err != nil {
  59. return fmt.Errorf("Fail to connect to database: %v", err)
  60. }
  61. ss.engine = engine
  62. ss.Dialect = migrator.NewDialect(ss.engine)
  63. // temporarily still set global var
  64. x = engine
  65. dialect = ss.Dialect
  66. migrator := migrator.NewMigrator(x)
  67. migrations.AddMigrations(migrator)
  68. for _, descriptor := range registry.GetServices() {
  69. sc, ok := descriptor.Instance.(registry.DatabaseMigrator)
  70. if ok {
  71. sc.AddMigration(migrator)
  72. }
  73. }
  74. if err := migrator.Start(); err != nil {
  75. return fmt.Errorf("Migration failed err: %v", err)
  76. }
  77. // Init repo instances
  78. annotations.SetRepository(&SqlAnnotationRepo{})
  79. ss.Bus.SetTransactionManager(ss)
  80. // Register handlers
  81. ss.addUserQueryAndCommandHandlers()
  82. // ensure admin user
  83. if ss.skipEnsureAdmin {
  84. return nil
  85. }
  86. return ss.ensureAdminUser()
  87. }
  88. func (ss *SqlStore) ensureAdminUser() error {
  89. systemUserCountQuery := m.GetSystemUserCountStatsQuery{}
  90. err := ss.InTransaction(context.Background(), func(ctx context.Context) error {
  91. err := bus.DispatchCtx(ctx, &systemUserCountQuery)
  92. if err != nil {
  93. return fmt.Errorf("Could not determine if admin user exists: %v", err)
  94. }
  95. if systemUserCountQuery.Result.Count > 0 {
  96. return nil
  97. }
  98. cmd := m.CreateUserCommand{}
  99. cmd.Login = setting.AdminUser
  100. cmd.Email = setting.AdminUser + "@localhost"
  101. cmd.Password = setting.AdminPassword
  102. cmd.IsAdmin = true
  103. if err := bus.DispatchCtx(ctx, &cmd); err != nil {
  104. return fmt.Errorf("Failed to create admin user: %v", err)
  105. }
  106. ss.log.Info("Created default admin", "user", setting.AdminUser)
  107. return nil
  108. })
  109. return err
  110. }
  111. func (ss *SqlStore) buildExtraConnectionString(sep rune) string {
  112. if ss.dbCfg.UrlQueryParams == nil {
  113. return ""
  114. }
  115. var sb strings.Builder
  116. for key, values := range ss.dbCfg.UrlQueryParams {
  117. for _, value := range values {
  118. sb.WriteRune(sep)
  119. sb.WriteString(key)
  120. sb.WriteRune('=')
  121. sb.WriteString(value)
  122. }
  123. }
  124. return sb.String()
  125. }
  126. func (ss *SqlStore) buildConnectionString() (string, error) {
  127. cnnstr := ss.dbCfg.ConnectionString
  128. // special case used by integration tests
  129. if cnnstr != "" {
  130. return cnnstr, nil
  131. }
  132. switch ss.dbCfg.Type {
  133. case migrator.MYSQL:
  134. protocol := "tcp"
  135. if strings.HasPrefix(ss.dbCfg.Host, "/") {
  136. protocol = "unix"
  137. }
  138. cnnstr = fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&allowNativePasswords=true",
  139. ss.dbCfg.User, ss.dbCfg.Pwd, protocol, ss.dbCfg.Host, ss.dbCfg.Name)
  140. if ss.dbCfg.SslMode == "true" || ss.dbCfg.SslMode == "skip-verify" {
  141. tlsCert, err := makeCert(ss.dbCfg)
  142. if err != nil {
  143. return "", err
  144. }
  145. mysql.RegisterTLSConfig("custom", tlsCert)
  146. cnnstr += "&tls=custom"
  147. }
  148. cnnstr += ss.buildExtraConnectionString('&')
  149. case migrator.POSTGRES:
  150. host, port := util.SplitHostPortDefault(ss.dbCfg.Host, "127.0.0.1", "5432")
  151. if ss.dbCfg.Pwd == "" {
  152. ss.dbCfg.Pwd = "''"
  153. }
  154. if ss.dbCfg.User == "" {
  155. ss.dbCfg.User = "''"
  156. }
  157. cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s sslmode=%s sslcert=%s sslkey=%s sslrootcert=%s", ss.dbCfg.User, ss.dbCfg.Pwd, host, port, ss.dbCfg.Name, ss.dbCfg.SslMode, ss.dbCfg.ClientCertPath, ss.dbCfg.ClientKeyPath, ss.dbCfg.CaCertPath)
  158. cnnstr += ss.buildExtraConnectionString(' ')
  159. case migrator.SQLITE:
  160. // special case for tests
  161. if !filepath.IsAbs(ss.dbCfg.Path) {
  162. ss.dbCfg.Path = filepath.Join(ss.Cfg.DataPath, ss.dbCfg.Path)
  163. }
  164. os.MkdirAll(path.Dir(ss.dbCfg.Path), os.ModePerm)
  165. cnnstr = fmt.Sprintf("file:%s?cache=%s&mode=rwc", ss.dbCfg.Path, ss.dbCfg.CacheMode)
  166. cnnstr += ss.buildExtraConnectionString('&')
  167. default:
  168. return "", fmt.Errorf("Unknown database type: %s", ss.dbCfg.Type)
  169. }
  170. return cnnstr, nil
  171. }
  172. func (ss *SqlStore) getEngine() (*xorm.Engine, error) {
  173. connectionString, err := ss.buildConnectionString()
  174. if err != nil {
  175. return nil, err
  176. }
  177. sqlog.Info("Connecting to DB", "dbtype", ss.dbCfg.Type)
  178. engine, err := xorm.NewEngine(ss.dbCfg.Type, connectionString)
  179. if err != nil {
  180. return nil, err
  181. }
  182. engine.SetMaxOpenConns(ss.dbCfg.MaxOpenConn)
  183. engine.SetMaxIdleConns(ss.dbCfg.MaxIdleConn)
  184. engine.SetConnMaxLifetime(time.Second * time.Duration(ss.dbCfg.ConnMaxLifetime))
  185. // configure sql logging
  186. debugSql := ss.Cfg.Raw.Section("database").Key("log_queries").MustBool(false)
  187. if !debugSql {
  188. engine.SetLogger(&xorm.DiscardLogger{})
  189. } else {
  190. engine.SetLogger(NewXormLogger(log.LvlInfo, log.New("sqlstore.xorm")))
  191. engine.ShowSQL(true)
  192. engine.ShowExecTime(true)
  193. }
  194. return engine, nil
  195. }
  196. func (ss *SqlStore) readConfig() {
  197. sec := ss.Cfg.Raw.Section("database")
  198. cfgURL := sec.Key("url").String()
  199. if len(cfgURL) != 0 {
  200. dbURL, _ := url.Parse(cfgURL)
  201. ss.dbCfg.Type = dbURL.Scheme
  202. ss.dbCfg.Host = dbURL.Host
  203. pathSplit := strings.Split(dbURL.Path, "/")
  204. if len(pathSplit) > 1 {
  205. ss.dbCfg.Name = pathSplit[1]
  206. }
  207. userInfo := dbURL.User
  208. if userInfo != nil {
  209. ss.dbCfg.User = userInfo.Username()
  210. ss.dbCfg.Pwd, _ = userInfo.Password()
  211. }
  212. ss.dbCfg.UrlQueryParams = dbURL.Query()
  213. } else {
  214. ss.dbCfg.Type = sec.Key("type").String()
  215. ss.dbCfg.Host = sec.Key("host").String()
  216. ss.dbCfg.Name = sec.Key("name").String()
  217. ss.dbCfg.User = sec.Key("user").String()
  218. ss.dbCfg.ConnectionString = sec.Key("connection_string").String()
  219. ss.dbCfg.Pwd = sec.Key("password").String()
  220. }
  221. ss.dbCfg.MaxOpenConn = sec.Key("max_open_conn").MustInt(0)
  222. ss.dbCfg.MaxIdleConn = sec.Key("max_idle_conn").MustInt(2)
  223. ss.dbCfg.ConnMaxLifetime = sec.Key("conn_max_lifetime").MustInt(14400)
  224. ss.dbCfg.SslMode = sec.Key("ssl_mode").String()
  225. ss.dbCfg.CaCertPath = sec.Key("ca_cert_path").String()
  226. ss.dbCfg.ClientKeyPath = sec.Key("client_key_path").String()
  227. ss.dbCfg.ClientCertPath = sec.Key("client_cert_path").String()
  228. ss.dbCfg.ServerCertName = sec.Key("server_cert_name").String()
  229. ss.dbCfg.Path = sec.Key("path").MustString("data/grafana.db")
  230. ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private")
  231. }
  232. // Interface of arguments for testing db
  233. type ITestDB interface {
  234. Helper()
  235. Fatalf(format string, args ...interface{})
  236. }
  237. // InitTestDB initiliaze test DB
  238. func InitTestDB(t ITestDB) *SqlStore {
  239. t.Helper()
  240. sqlstore := &SqlStore{}
  241. sqlstore.skipEnsureAdmin = true
  242. sqlstore.Bus = bus.New()
  243. sqlstore.CacheService = localcache.New(5*time.Minute, 10*time.Minute)
  244. dbType := migrator.SQLITE
  245. // environment variable present for test db?
  246. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  247. dbType = db
  248. }
  249. // set test db config
  250. sqlstore.Cfg = setting.NewCfg()
  251. sec, _ := sqlstore.Cfg.Raw.NewSection("database")
  252. sec.NewKey("type", dbType)
  253. switch dbType {
  254. case "mysql":
  255. sec.NewKey("connection_string", sqlutil.TestDB_Mysql.ConnStr)
  256. case "postgres":
  257. sec.NewKey("connection_string", sqlutil.TestDB_Postgres.ConnStr)
  258. default:
  259. sec.NewKey("connection_string", sqlutil.TestDB_Sqlite3.ConnStr)
  260. }
  261. // need to get engine to clean db before we init
  262. engine, err := xorm.NewEngine(dbType, sec.Key("connection_string").String())
  263. if err != nil {
  264. t.Fatalf("Failed to init test database: %v", err)
  265. }
  266. sqlstore.Dialect = migrator.NewDialect(engine)
  267. // temp global var until we get rid of global vars
  268. dialect = sqlstore.Dialect
  269. if err := dialect.CleanDB(); err != nil {
  270. t.Fatalf("Failed to clean test db %v", err)
  271. }
  272. if err := sqlstore.Init(); err != nil {
  273. t.Fatalf("Failed to init test database: %v", err)
  274. }
  275. sqlstore.engine.DatabaseTZ = time.UTC
  276. sqlstore.engine.TZLocation = time.UTC
  277. return sqlstore
  278. }
  279. func IsTestDbMySql() bool {
  280. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  281. return db == migrator.MYSQL
  282. }
  283. return false
  284. }
  285. func IsTestDbPostgres() bool {
  286. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  287. return db == migrator.POSTGRES
  288. }
  289. return false
  290. }
  291. type DatabaseConfig struct {
  292. Type string
  293. Host string
  294. Name string
  295. User string
  296. Pwd string
  297. Path string
  298. SslMode string
  299. CaCertPath string
  300. ClientKeyPath string
  301. ClientCertPath string
  302. ServerCertName string
  303. ConnectionString string
  304. MaxOpenConn int
  305. MaxIdleConn int
  306. ConnMaxLifetime int
  307. CacheMode string
  308. UrlQueryParams map[string][]string
  309. }